package t20240513;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.command.RedisCommand;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.streaming.connectors.redis.config.RedisValidator.REDIS_COMMAND;

public class Test3 {

    public static void main(String[] args) {


        Configuration flinkConf = new Configuration();
        flinkConf.setString("rest.port","9091");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkConf);
        env.setParallelism(2);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        String datagen = "CREATE TABLE datagen (" +
                "    index INT," +
                "    mm VARCHAR" +
                "  ) WITH (" +
                "    'connector' = 'datagen'," +
                "    'rows-per-second'='1'," +
                "    'fields.index.kind'='sequence'," +
                "    'fields.index.start'='1'," +
                "    'fields.index.end'='10'," +
                "    'fields.mm.length'='5'" +
                "  )";
        String ddl =
                "create table sink_redis(key VARCHAR, score INT, mm VARCHAR, " +
                        "rem_begin VARCHAR, rem_end VARCHAR" +
                        ",primary key (key) not ENFORCED" +
                        ") with ( " +
                        "'connector'='redis', "
                        + "'host'='"
                        + "127.0.0.1"
                        + "','port'='"
                        + "6379"
                        + "', 'redis-mode'='single','password'='"
                        + "yy2024"
                        + "','"
                        + "zset.zremrangeby' = 'LEX','"
                        + REDIS_COMMAND
                        + "'='"
                        + RedisCommand.ZADD
                        + "')";

        tEnv.executeSql(datagen);
        tEnv.executeSql(ddl);
        String sql = " insert into sink_redis select 'test_zadd' AS key, 100 as score, concat('aa', mm) mm, 'aa' as " +
                "rem_begin ,'bb' as rem_end " +
                "from datagen";
        TableResult tableResult = tEnv.executeSql(sql);
    }
}
